Tidy up messaging to device backends.
Add message response handlers indexed by message id instead of
relying on a queue of deferreds.
Tidy up error handling so that deferred errors are caught.
import traceback
from twisted.internet import defer
+defer.Deferred.debug = 1
from twisted.internet import reactor
import xen.lowlevel.xc; xc = xen.lowlevel.xc.new()
import os
from twisted.internet import defer
+defer.Deferred.debug = 1
import xen.lowlevel.xc; xc = xen.lowlevel.xc.new()
import xen.util.ip
raise VmError('unknown image type: ' + image_name)
image_handler(self, image)
deferred = self.configure()
+ def cbok(x):
+ print 'vm_create> cbok', x
+ return x
+ def cberr(err):
+ self.destroy()
+ return err
+ deferred.addCallback(cbok)
+ deferred.addErrback(cberr)
except StandardError, ex:
# Catch errors, cleanup and re-raise.
self.destroy()
raise
- def cbok(x):
- print 'vm_create> cbok', x
- return x
- deferred.addCallback(cbok)
print 'vm_create<'
return deferred
self.name = d['name']
self.memory = d['memory']/1024
deferred = self.configure()
+ def cberr(err):
+ self.destroy()
+ return err
+ deferred.addErrback(cberr)
except StandardError, ex:
self.destroy()
raise
from twisted.internet import reactor
from twisted.internet import defer
+defer.Deferred.debug = 1
from twisted.internet.protocol import Protocol
from twisted.internet.protocol import ClientFactory
import StringIO
from twisted.internet import defer
+defer.Deferred.debug = 1
from twisted.internet import reactor
from twisted.web import error
from twisted.web import resource
from twisted.internet import protocol
from twisted.internet import abstract
from twisted.internet import defer
+defer.Deferred.debug = 1
from xen.lowlevel import xu
"Invalid configuration")
try:
deferred = self.xd.domain_create(config)
- deferred.addCallback(self._cb_op_create, configstring, req)
+ deferred.addCallback(self._op_create_cb, configstring, req)
+ deferred.addErrback(self._op_create_err, req)
return deferred
except Exception, ex:
print 'op_create> Exception creating domain:'
# str(ex))
- def _cb_op_create(self, dominfo, configstring, req):
+ def _op_create_cb(self, dominfo, configstring, req):
"""Callback to handle deferred domain creation.
"""
dom = dominfo.id
out.close()
return val
+ def _op_create_err(self, err, req):
+ """Callback to handle errors in deferred domain creation.
+ """
+ print 'op_create> Deferred Exception creating domain:', err
+ req.setResponseCode(http.BAD_REQUEST, "Error creating domain")
+ return str(err)
+
def op_restore(self, op, req):
"""Restore a domain from file.
"""
# Copyright (C) 2004 Mike Wray <mike.wray@hp.com>
from twisted.internet import defer
+defer.Deferred.debug = 1
from xen.xend import sxp
from xen.xend import PrettyPrint
self.majorTypes = [ CMSG_BLKIF_BE ]
self.subTypes = {
- CMSG_BLKIF_BE_CREATE : self.recv_be_create,
- CMSG_BLKIF_BE_CONNECT : self.recv_be_connect,
- CMSG_BLKIF_BE_VBD_CREATE : self.recv_be_vbd_create,
- CMSG_BLKIF_BE_VBD_GROW : self.recv_be_vbd_grow,
CMSG_BLKIF_BE_DRIVER_STATUS_CHANGED: self.recv_be_driver_status_changed,
}
self.attached = 1
dom domain
recreate if true it's a recreate (after xend restart)
"""
- d = self.addDeferred()
+ d = defer.Deferred()
blkif = self.getInstanceByDom(dom)
if blkif:
- self.callDeferred(blkif)
+ d.callback(blkif)
else:
blkif = BlkifController(self, dom)
self.addInstance(blkif)
if recreate:
- self.callDeferred(blkif)
+ d.callback(blkif)
else:
- blkif.send_be_create()
+ d1 = defer.Deferred()
+ d1.addCallback(self.respond_be_create, d)
+ d1.addErrback(d.errback)
+ blkif.send_be_create(response=d1)
return d
def getDomainDevices(self, dom):
for blkif in self.getInstances():
blkif.reattached()
- def recv_be_create(self, msg, req):
- #print 'recv_be_create>'
+ def respond_be_create(self, msg, d):
+ print 'respond_be_create>'
val = unpackMsg('blkif_be_create_t', msg)
blkif = self.getInstanceByDom(val['domid'])
- self.callDeferred(blkif)
+ d.callback(blkif)
- def recv_be_connect(self, msg, req):
- #print 'recv_be_create>'
+ def respond_be_connect(self, msg):
+ print 'respond_be_connect>', self
val = unpackMsg('blkif_be_connect_t', msg)
blkif = self.getInstanceByDom(val['domid'])
if blkif:
else:
pass
- def recv_be_vbd_create(self, msg, req):
- #print 'recv_be_vbd_create>'
+ def respond_be_vbd_create(self, msg, d):
+ print 'recv_be_vbd_create>', self
val = unpackMsg('blkif_be_vbd_create_t', msg)
blkif = self.getInstanceByDom(val['domid'])
if blkif:
- blkif.send_be_vbd_grow(val['vdevice'])
+ d1 = defer.Deferred()
+ d1.addCallback(self.respond_be_vbd_grow, d)
+ if d: d1.addErrback(d.errback)
+ blkif.send_be_vbd_grow(val['vdevice'], response=d1)
else:
pass
- def recv_be_vbd_grow(self, msg, req):
- #print 'recv_be_vbd_grow>'
+ def respond_be_vbd_grow(self, msg, d):
+ print 'recv_be_vbd_grow>', self
val = unpackMsg('blkif_be_vbd_grow_t', msg)
# Check status?
if self.attached:
- self.callDeferred(0)
+ if d:
+ d.callback(0)
else:
self.reattachDevice(val['domid'], val['vdevice'])
def recv_be_driver_status_changed(self, msg, req):
+ print 'recv_be_driver_status_changed>', self, req
val = unpackMsg('blkif_be_driver_status_changed_t', msg)
status = val['status']
if status == BLKIF_DRIVER_STATUS_UP and not self.attached:
"""
dev = self.addDevice(vdev, mode, segment)
if not dev: return -1
+ d = defer.Deferred()
if recreate:
- d = defer.Deferred()
d.callback(self)
else:
- self.send_be_vbd_create(vdev)
- d = self.factory.addDeferred()
+ d1 = defer.Deferred()
+ d1.addCallback(self.factory.respond_be_vbd_create, d)
+ d1.addErrback(d.errback)
+ self.send_be_vbd_create(vdev, response=d1)
return d
def destroy(self):
def cb_destroy(val):
self.send_be_destroy()
- d = self.factory.addDeferred()
+ d = defer.Deferred()
d.addCallback(cb_destroy)
- self.send_be_disconnect()
+ self.send_be_disconnect(response=d)
def destroyDevices(self):
for dev in self.getDevices():
self.attached = 0
for dev in self.devices.values():
dev.attached = 0
- self.send_be_vbd_create(vdev)
+ d1 = defer.Deferred()
+ d1.addCallback(self.factory.respond_be_vbd_create, None)
+ self.send_be_vbd_create(vdev, response=d1)
def reattachDevice(self, vdev):
"""Reattach a device, when the back-end control domain has changed.
'blkif_handle' : val['handle'],
'evtchn' : self.evtchn['port1'],
'shmem_frame' : val['shmem_frame'] })
- self.factory.writeRequest(msg)
- pass
+ d = defer.Deferred()
+ d.addCallback(self.factory.respond_be_connect)
+ self.factory.writeRequest(msg, response=d)
- def send_fe_interface_status_changed(self):
+ def send_fe_interface_status_changed(self, response=None):
msg = packMsg('blkif_fe_interface_status_changed_t',
{ 'handle' : 0,
'status' : BLKIF_INTERFACE_STATUS_CONNECTED,
'evtchn' : self.evtchn['port2'] })
- self.writeRequest(msg)
+ self.writeRequest(msg, response=response)
- def send_be_create(self):
+ def send_be_create(self, response=None):
msg = packMsg('blkif_be_create_t',
{ 'domid' : self.dom,
'blkif_handle' : 0 })
- self.factory.writeRequest(msg)
+ self.factory.writeRequest(msg, response=response)
- def send_be_disconnect(self):
+ def send_be_disconnect(self, response=None):
print '>BlkifController>send_be_disconnect>', 'dom=', self.dom
msg = packMsg('blkif_be_disconnect_t',
{ 'domid' : self.dom,
'blkif_handle' : 0 })
- self.factory.writeRequest(msg)
+ self.factory.writeRequest(msg, response=response)
- def send_be_destroy(self):
+ def send_be_destroy(self, response=None):
print '>BlkifController>send_be_destroy>', 'dom=', self.dom
msg = packMsg('blkif_be_destroy_t',
{ 'domid' : self.dom,
'blkif_handle' : 0 })
- self.factory.writeRequest(msg)
+ self.factory.writeRequest(msg, response=response)
- def send_be_vbd_create(self, vdev):
+ def send_be_vbd_create(self, vdev, response=None):
dev = self.devices[vdev]
msg = packMsg('blkif_be_vbd_create_t',
{ 'domid' : self.dom,
'blkif_handle' : 0,
'vdevice' : dev.vdev,
'readonly' : dev.readonly() })
- self.factory.writeRequest(msg)
+ self.factory.writeRequest(msg, response=response)
- def send_be_vbd_grow(self, vdev):
+ def send_be_vbd_grow(self, vdev, response=None):
dev = self.devices[vdev]
msg = packMsg('blkif_be_vbd_grow_t',
{ 'domid' : self.dom,
'extent.device' : dev.device,
'extent.sector_start' : dev.start_sector,
'extent.sector_length' : dev.nr_sectors })
- self.factory.writeRequest(msg)
+ self.factory.writeRequest(msg, response=response)
- def send_be_vbd_destroy(self, vdev):
+ def send_be_vbd_destroy(self, vdev, response=None):
print '>BlkifController>send_be_vbd_destroy>', 'dom=', self.dom, 'vdev=', vdev
PrettyPrint.prettyprint(self.sxpr())
dev = self.devices[vdev]
'blkif_handle' : 0,
'vdevice' : dev.vdev })
del self.devices[vdev]
- self.factory.writeRequest(msg)
+ self.factory.writeRequest(msg, response=response)
# Copyright (C) 2004 Mike Wray <mike.wray@hp.com>
from twisted.internet import defer
+defer.Deferred.debug = 1
import channel
from messages import msgTypeName
+DEBUG=0
+
+class OutOfOrderError(RuntimeError):
+ """Error reported when a response arrives out of order.
+ """
+ pass
+
+class Responder:
+ """Handler for a response to a message.
+ """
+
+ def __init__(self, mid, deferred):
+ """Create a responder.
+
+ mid message id of response to handle
+ deferred deferred object holding the callbacks
+ """
+ self.mid = mid
+ self.deferred = deferred
+
+ def responseReceived(self, msg):
+ if self.deferred.called: return
+ self.deferred.callback(msg)
+
+ def error(self, err):
+ if self.deferred.called: return
+ self.deferred.errback(err)
+
class CtrlMsgRcvr:
"""Abstract class for things that deal with a control interface to a domain.
self.dom = None
self.channel = None
self.idx = None
+ self.responders = []
+ # Timeout (in seconds) for deferreds.
+ self.timeout = 10
+
+ def setTimeout(self, timeout):
+ self.timeout = timeout
def requestReceived(self, msg, type, subtype):
"""Dispatch a request to handlers.
type major message type
subtype minor message type
"""
+ msgid = msg.get_header()['id']
+ if DEBUG:
+ print 'requestReceived>', self, msgid, msgTypeName(type, subtype)
method = self.subTypes.get(subtype)
if method:
method(msg, 1)
- else:
+ elif DEBUG:
print ('requestReceived> No handler: Message type %s %d:%d'
% (msgTypeName(type, subtype), type, subtype)), self
type major message type
subtype minor message type
"""
+ msgid = msg.get_header()['id']
+ if DEBUG:
+ print 'responseReceived>', self, msgid, msgTypeName(type, subtype)
+ if self.callResponders(msg):
+ return
method = self.subTypes.get(subtype)
if method:
method(msg, 0)
- else:
+ elif DEBUG:
print ('responseReceived> No handler: Message type %s %d:%d'
% (msgTypeName(type, subtype), type, subtype)), self
+ def addResponder(self, mid, deferred):
+ """Add a responder for a message id.
+ The deferred is called with callback(msg) when a response
+ with the given message id arrives. Responses are expected
+ to arrive in order of message id. When a response arrives,
+ waiting responders for messages with lower id have errback
+ called with an OutOfOrder error.
+
+ mid message id of response expected
+ deferred a Deferred to handle the response
+
+ returns Responder
+ """
+ if self.timeout > 0:
+ deferred.setTimeout(self.timeout)
+ resp = Responder(mid, deferred)
+ self.responders.append(resp)
+ return resp
+
+ def callResponders(self, msg):
+ """Call any waiting responders for a response message.
+
+ msg response message
+
+ returns 1 if there was a responder for the message, 0 otherwise
+ """
+ hdr = msg.get_header()
+ mid = hdr['id']
+ handled = 0
+ while self.responders:
+ resp = self.responders[0]
+ if resp.mid > mid:
+ break
+ self.responders.pop()
+ if resp.mid < mid:
+ print 'handleResponse> Out of order:', resp.mid, mid
+ resp.error(OutOfOrderError())
+ else:
+ handled = 1
+ resp.responseReceived(msg)
+ break
+ return handled
+
def lostChannel(self):
"""Called when the channel to the domain is lost.
"""
"""Register interest in our major message types with the
channel to our domain.
"""
- #print 'CtrlMsgRcvr>registerChannel>', self
self.channel = self.channelFactory.domChannel(self.dom)
self.idx = self.channel.getIndex()
if self.majorTypes:
"""Deregister interest in our major message types with the
channel to our domain.
"""
- #print 'CtrlMsgRcvr>deregisterChannel>', self
if self.channel:
self.channel.deregisterDevice(self)
del self.channel
"""
return 0
- def writeRequest(self, msg):
+ def writeRequest(self, msg, response=None):
"""Write a request to the channel.
+
+ msg message
+ response Deferred to handle the response (optional)
"""
if self.channel:
+ if DEBUG: print 'CtrlMsgRcvr>writeRequest>', self, msg
+ if response:
+ self.addResponder(msg.get_header()['id'], response)
self.channel.writeRequest(msg)
else:
print 'CtrlMsgRcvr>writeRequest>', 'no channel!', self
"""Write a response to the channel.
"""
if self.channel:
+ if DEBUG: print 'CtrlMsgRcvr>writeResponse>', self, msg
self.channel.writeResponse(msg)
else:
print 'CtrlMsgRcvr>writeResponse>', 'no channel!', self
instances : mapping of index to controller instance
dlist : list of deferreds
dom : domain
- timeout : deferred timeout
"""
def __init__(self):
self.instances = {}
self.dlist = []
self.dom = 0
- # Timeout (in seconds) for deferreds.
- self.timeout = 10
def addInstance(self, instance):
"""Add a controller instance (under its index).
"""
self.delInstance(instance)
- def addDeferred(self):
- """Add a deferred object.
-
- returns deferred
- """
- d = defer.Deferred()
- if self.timeout > 0:
- # The deferred will error if not called before timeout.
- d.setTimeout(self.timeout)
- self.dlist.append(d)
- return d
-
- def callDeferred(self, *args):
- """Call the top deferred object
-
- args arguments
- """
- if self.dlist:
- d = self.dlist.pop(0)
- if not d.called:
- d.callback(*args)
-
- def errDeferred(self, *args):
- """Signal an error to the top deferred object.
-
- args arguments
- """
- if self.dlist:
- d = self.dlist.pop(0)
- if not d.called:
- d.errback(*args)
-
class Controller(CtrlMsgRcvr):
"""Abstract class for a device controller attached to a domain.
"""
class Msg:
pass
+_next_msgid = 0
+
+def nextid():
+ global _next_msgid
+ return ++_next_msgid
+
def packMsg(ty, params):
"""Pack a message.
Any 'mac' parameter is passed in as an int[6] array and converted.
returns xu message
"""
- if DEBUG: print '>packMsg', ty, params
+ msgid = nextid()
+ if DEBUG: print '>packMsg', msgid, ty, params
(major, minor) = msg_formats[ty]
args = {}
for (k, v) in params.items():
if DEBUG:
for (k, v) in args.items():
print 'packMsg>', k, v, type(v)
- msgid = 0
msg = xu.message(major, minor, msgid, args)
return msg
args['mac'] = mac
for k in macs:
del args[k]
- if DEBUG: print '<unpackMsg', ty, args
+ if DEBUG:
+ msgid = msg.get_header()['id']
+ print '<unpackMsg', msgid, ty, args
return args
def msgTypeName(ty, subty):
import random
from twisted.internet import defer
+defer.Deferred.debug = 1
from xen.xend import sxp
from xen.xend import PrettyPrint
self.majorTypes = [ CMSG_NETIF_BE ]
self.subTypes = {
- CMSG_NETIF_BE_CREATE : self.recv_be_create,
- CMSG_NETIF_BE_CONNECT: self.recv_be_connect,
+ #CMSG_NETIF_BE_CREATE : self.recv_be_create,
+ #CMSG_NETIF_BE_CONNECT: self.recv_be_connect,
CMSG_NETIF_BE_DRIVER_STATUS_CHANGED: self.recv_be_driver_status_changed,
}
self.attached = 1
"""
return self.dom
- def recv_be_create(self, msg, req):
- self.callDeferred(0)
-
- def recv_be_connect(self, msg, req):
+ def respond_be_connect(self, msg):
val = unpackMsg('netif_be_connect_t', msg)
dom = val['domid']
vif = val['netif_handle']
if netif:
netif.send_interface_connected(vif)
else:
- print "recv_be_connect> unknown vif=", vif
+ print "respond_be_connect> unknown vif=", vif
pass
def recv_be_driver_status_changed(self, msg, req):
def cb_destroy(val):
self.controller.send_be_destroy(self.vif)
self.down()
- d = self.controller.factory.addDeferred()
+ #d = self.controller.factory.addDeferred()
+ d = defer.Deferred()
d.addCallback(cb_destroy)
- self.controller.send_be_disconnect(self.vif)
+ self.controller.send_be_disconnect(self.vif, response=d)
class NetifController(controller.Controller):
@param vmac mac address (string)
"""
self.addDevice(vif, vmac)
+ d = defer.Deferred()
if recreate:
- d = defer.Deferred()
d.callback(self)
else:
- d = self.factory.addDeferred()
- self.send_be_create(vif)
+ self.send_be_create(vif, response=d)
return d
def reattach_devices(self):
"""Reattach all devices when the back-end control domain has changed.
"""
- d = self.factory.addDeferred()
+ #d = self.factory.addDeferred()
self.send_be_create(vif)
- self.attach_fe_devices(0)
+ self.attach_fe_devices()
def attach_fe_devices(self):
for dev in self.devices.values():
'evtchn' : dev.evtchn['port1'],
'tx_shmem_frame' : val['tx_shmem_frame'],
'rx_shmem_frame' : val['rx_shmem_frame'] })
- self.factory.writeRequest(msg)
+ d = defer.Deferred()
+ d.addCallback(self.factory.respond_be_connect)
+ self.factory.writeRequest(msg, response=d)
- def send_interface_connected(self, vif):
+ def send_interface_connected(self, vif, response=None):
dev = self.devices[vif]
msg = packMsg('netif_fe_interface_status_changed_t',
{ 'handle' : dev.vif,
'status' : NETIF_INTERFACE_STATUS_CONNECTED,
'evtchn' : dev.evtchn['port2'],
'mac' : dev.mac })
- self.writeRequest(msg)
+ self.writeRequest(msg, response=response)
- def send_be_create(self, vif):
+ def send_be_create(self, vif, response=None):
dev = self.devices[vif]
msg = packMsg('netif_be_create_t',
{ 'domid' : self.dom,
'netif_handle' : dev.vif,
'mac' : dev.mac })
- self.factory.writeRequest(msg)
+ self.factory.writeRequest(msg, response=response)
- def send_be_disconnect(self, vif):
+ def send_be_disconnect(self, vif, response=None):
dev = self.devices[vif]
msg = packMsg('netif_be_disconnect_t',
{ 'domid' : self.dom,
'netif_handle' : dev.vif })
- self.factory.writeRequest(msg)
+ self.factory.writeRequest(msg, response=response)
- def send_be_destroy(self, vif):
+ def send_be_destroy(self, vif, response=None):
PrettyPrint.prettyprint(self.sxpr())
dev = self.devices[vif]
del self.devices[vif]
msg = packMsg('netif_be_destroy_t',
{ 'domid' : self.dom,
'netif_handle' : vif })
- self.factory.writeRequest(msg)
+ self.factory.writeRequest(msg, response=response)